/*
 * Copyright 2019 ACINQ SAS
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package fr.acinq.eclair.blockchain.bitcoind.rpc

import fr.acinq.bitcoin.psbt.Psbt
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat._
import fr.acinq.bitcoin.{Block, SigHash}
import fr.acinq.eclair.ShortChannelId.coordinates
import fr.acinq.eclair.blockchain.OnChainWallet
import fr.acinq.eclair.blockchain.OnChainWallet.{FundTransactionResponse, MakeFundingTxResponse, OnChainBalance, ProcessPsbtResponse}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{GetTxWithMetaResponse, UtxoStatus, ValidateResult}
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient.AddressType.P2wpkh
import fr.acinq.eclair.blockchain.fee.{FeeratePerKB, FeeratePerKw}
import fr.acinq.eclair.crypto.keymanager.OnChainKeyManager
import fr.acinq.eclair.transactions.Transactions
import fr.acinq.eclair.wire.protocol.ChannelAnnouncement
import fr.acinq.eclair.{BlockHeight, TimestampSecond, TxCoordinates}
import grizzled.slf4j.Logging
import org.json4s.Formats
import org.json4s.JsonAST._
import scodec.bits.ByteVector

import java.util.Base64
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters.ListHasAsScala
import scala.util.{Failure, Success, Try}

/**
 * Created by PM on 26/04/2016.
 */

/**
 * The Bitcoin Core client provides some high-level utility methods to interact with Bitcoin Core.
 *
 * @param rpcClient             bitcoin core JSON rpc client
 * @param onChainKeyManager_opt optional on-chain key manager. If provided it will be used to sign transactions (it is assumed that bitcoin
 *                              core uses a watch-only wallet with descriptors generated by Eclair with this on-chain key manager)
 */
class BitcoinCoreClient(val rpcClient: BitcoinJsonRPCClient, val lockUtxos: Boolean = true, val onChainKeyManager_opt: Option[OnChainKeyManager] = None) extends OnChainWallet with Logging {

  import BitcoinCoreClient._

  implicit val formats: Formats = org.json4s.DefaultFormats

  onChainKeyManager_opt.foreach { keyManager =>
    require(rpcClient.wallet.contains(keyManager.walletName), s"eclair-backed bitcoin wallet mismatch: eclair-signer.conf uses wallet=${keyManager.walletName}, but eclair.conf uses wallet=${rpcClient.wallet.getOrElse("")}")
  }

  val useEclairSigner = onChainKeyManager_opt.nonEmpty

  //------------------------- TRANSACTIONS  -------------------------//

  def getTransaction(txid: TxId)(implicit ec: ExecutionContext): Future[Transaction] =
    getRawTransaction(txid).map(raw => Transaction.read(raw))

  private def getRawTransaction(txid: TxId)(implicit ec: ExecutionContext): Future[String] =
    rpcClient.invoke("getrawtransaction", txid).collect {
      case JString(raw) => raw
    }

  def getTransactionMeta(txid: TxId)(implicit ec: ExecutionContext): Future[GetTxWithMetaResponse] =
    for {
      tx_opt <- getTransaction(txid).map(Some(_)).recover { case _ => None }
      blockchainInfo <- rpcClient.invoke("getblockchaininfo")
      JInt(timestamp) = blockchainInfo \ "mediantime"
    } yield GetTxWithMetaResponse(txid, tx_opt, TimestampSecond(timestamp.toLong))

  /** Get the number of confirmations of a given transaction. */
  def getTxConfirmations(txid: TxId)(implicit ec: ExecutionContext): Future[Option[Int]] =
    rpcClient.invoke("getrawtransaction", txid, 1 /* verbose output is needed to get the number of confirmations */)
      .map(json => Some((json \ "confirmations").extractOrElse[Int](0)))
      .recover {
        case t: JsonRPCError if t.error.code == -5 => None // Invalid or non-wallet transaction id (code: -5)
      }

  /** Get the hash of the block containing a given transaction. */
  private def getTxBlockId(txid: TxId)(implicit ec: ExecutionContext): Future[Option[BlockId]] =
    rpcClient.invoke("getrawtransaction", txid, 1 /* verbose output is needed to get the block hash */)
      .map(json => (json \ "blockhash").extractOpt[String].map(b => BlockId(ByteVector32.fromValidHex(b))))
      .recover {
        case t: JsonRPCError if t.error.code == -5 => None // Invalid or non-wallet transaction id (code: -5)
      }

  /**
   * @return a Future[height, index] where height is the height of the block where this transaction was published, and
   *         index is the index of the transaction in that block.
   */
  def getTransactionShortId(txid: TxId)(implicit ec: ExecutionContext): Future[(BlockHeight, Int)] =
    for {
      Some(blockId) <- getTxBlockId(txid)
      json <- rpcClient.invoke("getblock", blockId)
      JInt(height) = json \ "height"
      JArray(txs) = json \ "tx"
      index = txs.indexOf(JString(txid.value.toHex))
    } yield (BlockHeight(height.toInt), index)

  /**
   * Return true if this output can potentially be spent.
   *
   * Note that if this function returns false, that doesn't mean the output cannot be spent. The output could be unknown
   * (not in the blockchain nor in the mempool) but could reappear later and be spendable at that point. If you want to
   * ensure that an output is not spendable anymore, you should use [[isTransactionOutputSpent]].
   */
  def isTransactionOutputSpendable(txid: TxId, outputIndex: Int, includeMempool: Boolean)(implicit ec: ExecutionContext): Future[Boolean] =
    for {
      json <- rpcClient.invoke("gettxout", txid, outputIndex, includeMempool)
    } yield json != JNull

  /**
   * Return true if this output has already been spent by a confirmed transaction.
   * Note that a reorg may invalidate the result of this function and make a spent output spendable again.
   */
  def isTransactionOutputSpent(txid: TxId, outputIndex: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
    getTxConfirmations(txid).flatMap {
      case Some(confirmations) if confirmations > 0 =>
        // There is an important limitation when using isTransactionOutputSpendable: if it returns false, it can mean a
        // few different things:
        //  - the input has been spent
        //  - the input is coming from an unconfirmed transaction (in the mempool) but can be unspent
        //  - the input is unknown (it may come from an unconfirmed transaction that we don't have in our mempool)
        //
        // The only way to make sure that our output has been spent is to verify that it is coming from a confirmed
        // transaction and that it has been spent by another confirmed transaction. We want to ignore the mempool to
        // only consider spending transactions that have been confirmed.
        isTransactionOutputSpendable(txid, outputIndex, includeMempool = false).map(r => !r)
      case _ =>
        // If the output itself isn't in the blockchain, it cannot be spent by a confirmed transaction.
        Future.successful(false)
    }
  }

  def doubleSpent(tx: Transaction)(implicit ec: ExecutionContext): Future[Boolean] =
    for {
      exists <- getTransaction(tx.txid)
        .map(_ => true) // we have found the transaction
        .recover {
          case JsonRPCError(Error(_, message)) if message.contains("index") =>
            sys.error("Fatal error: bitcoind is indexing!!")
            sys.exit(1) // bitcoind is indexing, that's a fatal error!!
            false // won't be reached
          case _ => false
        }
      doubleSpent <- if (exists) {
        // if the tx is in the blockchain or in the mempool, it can't have been double-spent
        Future.successful(false)
      } else {
        // The only way to make sure that our transaction has been double-spent is to find an input that is coming from
        // a confirmed transaction and that it has been spent by another confirmed transaction.
        //
        // Note that if our transaction only had unconfirmed inputs and the transactions creating those inputs have
        // themselves been double-spent, we will never be able to consider our transaction double-spent. With the
        // information we have, these unknown inputs could eventually reappear and the transaction could be broadcast
        // again.
        Future.sequence(tx.txIn.map(txIn => isTransactionOutputSpent(txIn.outPoint.txid, txIn.outPoint.index.toInt))).map(_.exists(_ == true))
      }
    } yield doubleSpent

  /** Search for mempool transaction spending a given output. */
  def lookForMempoolSpendingTx(txid: TxId, outputIndex: Int)(implicit ec: ExecutionContext): Future[Transaction] = {
    rpcClient.invoke("gettxspendingprevout", Seq(OutpointArg(txid, outputIndex))).collect {
      case JArray(results) => results.flatMap(result => (result \ "spendingtxid").extractOpt[String].map(TxId.fromValidHex))
    }.flatMap { spendingTxIds =>
      spendingTxIds.headOption match {
        case Some(spendingTxId) => getTransaction(spendingTxId)
        case None => Future.failed(new RuntimeException(s"mempool doesn't contain any transaction spending $txid:$outputIndex"))
      }
    }
  }

  /**
   * Iterate over blocks to find the transaction that has spent a given output.
   * It isn't useful to look at the whole blockchain history: if the transaction was confirmed long ago, an attacker
   * will have already claimed all possible outputs and there's nothing we can do about it.
   *
   * @param blockHash_opt hash of a block *after* the output has been spent. If not provided, we will use the blockchain tip.
   * @param txid          id of the transaction output that has been spent.
   * @param outputIndex   index of the transaction output that has been spent.
   * @param limit         maximum number of previous blocks to scan.
   * @return the transaction spending the given output.
   */
  def lookForSpendingTx(blockHash_opt: Option[BlockHash], txid: TxId, outputIndex: Int, limit: Int)(implicit ec: ExecutionContext): Future[Transaction] = {
    for {
      blockId <- blockHash_opt match {
        case Some(blockHash) => Future.successful(BlockId(blockHash))
        // NB: bitcoind confusingly returns the blockId instead of the blockHash.
        case None => rpcClient.invoke("getbestblockhash").collect { case JString(blockId) => BlockId(ByteVector32.fromValidHex(blockId)) }
      }
      block <- getBlock(blockId)
      res <- block.tx.asScala.find(tx => tx.txIn.asScala.exists(i => i.outPoint.txid == KotlinUtils.scala2kmp(txid) && i.outPoint.index == outputIndex)) match {
        case Some(tx) => Future.successful(KotlinUtils.kmp2scala(tx))
        case None if limit > 0 => lookForSpendingTx(Some(KotlinUtils.kmp2scala(block.header.hashPreviousBlock)), txid, outputIndex, limit - 1)
        case None => Future.failed(new RuntimeException(s"couldn't find tx spending $txid:$outputIndex in the blockchain"))
      }
    } yield res
  }

  def listTransactions(count: Int, skip: Int)(implicit ec: ExecutionContext): Future[List[WalletTx]] = rpcClient.invoke("listtransactions", "*", count, skip).map {
    case JArray(txs) => txs.map(tx => {
      val JString(address) = tx \ "address"
      val JDecimal(amount) = tx \ "amount"
      // fee is optional and only included for sent transactions
      val fee = tx \ "fee" match {
        case JDecimal(fee) => toSatoshi(fee)
        case _ => Satoshi(0)
      }
      val JInt(confirmations) = tx \ "confirmations"
      // while transactions are still in the mempool, blockId will not be included
      val blockId_opt = tx \ "blockhash" match {
        case JString(blockId) => Some(BlockId(ByteVector32.fromValidHex(blockId)))
        case _ => None
      }
      val JString(txid) = tx \ "txid"
      val JInt(timestamp) = tx \ "time"
      WalletTx(address, toSatoshi(amount), fee, blockId_opt, confirmations.toLong, TxId.fromValidHex(txid), timestamp.toLong)
    }).reverse
    case _ => Nil
  }

  //------------------------- FUNDING  -------------------------//

  /**
   * @param feeBudget_opt max allowed fee, if the transaction returned by bitcoin core has a higher fee a funding error is returned.
   */
  def fundTransaction(tx: Transaction, options: FundTransactionOptions, feeBudget_opt: Option[Satoshi])(implicit ec: ExecutionContext): Future[FundTransactionResponse] = {
    rpcClient.invoke("fundrawtransaction", tx.toString(), options).flatMap(json => {
      val JString(hex) = json \ "hex"
      val JInt(changePos) = json \ "changepos"
      val JDecimal(fee) = json \ "fee"
      val fundedTx = Transaction.read(hex)
      val changePos_opt = if (changePos >= 0) Some(changePos.intValue) else None

      val walletInputs = fundedTx.txIn.map(_.outPoint).toSet -- tx.txIn.map(_.outPoint).toSet
      val addedOutputs = fundedTx.txOut.size - tx.txOut.size
      val feeSat = toSatoshi(fee)
      Try {
        require(addedOutputs <= 1, "more than one change output added")
        require(addedOutputs == 0 || changePos >= 0, "change output added, but position not returned")
        require(options.changePosition.isEmpty || changePos_opt.isEmpty || changePos_opt == options.changePosition, "change output added at wrong position")
        feeBudget_opt.foreach(feeBudget => require(feeSat <= feeBudget, s"mining fee is higher than budget ($feeSat > $feeBudget)"))

        FundTransactionResponse(fundedTx, feeSat, changePos_opt)
      } match {
        case Success(response) => Future.successful(response)
        case Failure(error) => unlockOutpoints(walletInputs.toSeq).flatMap(_ => Future.failed(error))
      }
    })
  }

  def fundTransaction(tx: Transaction, feeRate: FeeratePerKw, replaceable: Boolean = true, changePosition: Option[Int] = None, externalInputsWeight: Map[OutPoint, Long] = Map.empty, minInputConfirmations_opt: Option[Int] = None, feeBudget_opt: Option[Satoshi] = None)(implicit ec: ExecutionContext): Future[FundTransactionResponse] = {
    val options = FundTransactionOptions(
      feeRate = BigDecimal(feeRate.perKB.toLong).bigDecimal.scaleByPowerOfTen(-8),
      replaceable = replaceable,
      // We must either *always* lock inputs selected for funding or *never* lock them, otherwise locking wouldn't work
      // at all, as the following scenario highlights:
      //  - we fund a transaction for which we don't lock utxos
      //  - we fund another unrelated transaction for which we lock utxos
      //  - the second transaction ends up using the same utxos as the first one
      //  - but the first transaction confirms, invalidating the second one
      // This would break the assumptions of the second transaction: its inputs are locked, so it doesn't expect to
      // potentially be double-spent.
      lockUnspents = lockUtxos,
      changePosition = changePosition,
      minconf = minInputConfirmations_opt,
      input_weights = if (externalInputsWeight.isEmpty) None else Some(externalInputsWeight.map { case (outpoint, weight) => InputWeight(outpoint, weight) }.toSeq),
    )
    fundTransaction(tx, options, feeBudget_opt = feeBudget_opt)
  }

  private def processPsbt(psbt: Psbt, sign: Boolean = true, sighashType: Option[Int] = None)(implicit ec: ExecutionContext): Future[ProcessPsbtResponse] = {
    // This RPC call takes a string for the "sighashtype" parameter with an explicitly limited list of valid values.
    val sighash = sighashType match {
      case None => None
      case Some(sighash) if sighash == SigHash.SIGHASH_DEFAULT => Some("DEFAULT")
      case Some(sighash) if sighash == SigHash.SIGHASH_ALL => Some("ALL")
      case Some(sighash) if sighash == SigHash.SIGHASH_NONE => Some("NONE")
      case Some(sighash) if sighash == SigHash.SIGHASH_SINGLE => Some("SINGLE")
      case Some(sighash) if sighash == (SigHash.SIGHASH_ALL | SigHash.SIGHASH_ANYONECANPAY) => Some("ALL|ANYONECANPAY")
      case Some(sighash) if sighash == (SigHash.SIGHASH_NONE | SigHash.SIGHASH_ANYONECANPAY) => Some("NONE|ANYONECANPAY")
      case Some(sighash) if sighash == (SigHash.SIGHASH_SINGLE | SigHash.SIGHASH_ANYONECANPAY) => Some("SINGLE|ANYONECANPAY")
      case _ => return Future.failed(new IllegalArgumentException(s"invalid sighash flag $sighashType"))
    }
    val encoded = Base64.getEncoder.encodeToString(Psbt.write(psbt).toByteArray)
    val params = Seq(encoded, sign) ++ sighash.toSeq
    rpcClient.invoke("walletprocesspsbt", params: _*).map(json => {
      val JString(base64) = json \ "psbt"
      val JBool(complete) = json \ "complete"
      val decoded = Psbt.read(Base64.getDecoder.decode(base64))
      require(decoded.isRight, s"cannot decode processed psbt=$base64")
      ProcessPsbtResponse(decoded.getRight, complete)
    })
  }

  private def utxoUpdatePsbt(psbt: Psbt)(implicit ec: ExecutionContext): Future[Psbt] = {
    val encoded = Base64.getEncoder.encodeToString(Psbt.write(psbt).toByteArray)
    rpcClient.invoke("utxoupdatepsbt", encoded).map(json => {
      val JString(base64) = json
      val decoded = Psbt.read(Base64.getDecoder.decode(base64))
      require(decoded.isRight, s"cannot decode updated psbt=$base64")
      decoded.getRight
    })
  }

  private def unlockIfFails[T](locked: Seq[OutPoint])(f: => Future[T])(implicit ec: ExecutionContext): Future[T] = {
    f.transformWith {
      // We preserve the original failure, regardless of the result of the utxo unlocking call.
      case Failure(f) => unlockOutpoints(locked).transformWith { _ => Future.failed(f) }
      case Success(result) => Future.successful(result)
    }
  }

  def makeFundingTx(pubkeyScript: ByteVector, amount: Satoshi, targetFeerate: FeeratePerKw, feeBudget_opt: Option[Satoshi] = None)(implicit ec: ExecutionContext): Future[MakeFundingTxResponse] = {

    def verifyAndSign(tx: Transaction, fees: Satoshi, requestedFeeRate: FeeratePerKw): Future[MakeFundingTxResponse] = {
      import KotlinUtils._

      val fundingOutputIndex = Transactions.findPubKeyScriptIndex(tx, pubkeyScript) match {
        case Left(_) => return Future.failed(new RuntimeException("cannot find expected funding output: bitcoin core may be malicious"))
        case Right(outputIndex) => outputIndex
      }
      val ourInputs = tx.txIn.indices.toList // all inputs are supposed to belong to our bitcoin wallet
      val ourOutputs = tx.txOut.indices.toList.filterNot(_ == fundingOutputIndex) // all outputs except for the funding output are supposed to belong to our bitcoin wallet
      val psbt = new Psbt(tx)
      for {
        signed <- signPsbt(psbt, ourInputs, ourOutputs)
        extracted = signed.psbt.extract()
        _ = require(extracted.isRight, s"signing psbt failed with ${extracted.getLeft}")
        actualFees = kmp2scala(signed.psbt.computeFees())
        _ = require(actualFees == fees, s"actual funding fees $actualFees do not match returned fees $fees: bitcoin core may be malicious")
        fundingTx = kmp2scala(extracted.getRight)
        actualFeerate = Transactions.fee2rate(actualFees, fundingTx.weight())
        maxFeerate = requestedFeeRate * 1.5
        _ = require(actualFeerate < maxFeerate, s"actual feerate $actualFeerate is more than 50% above requested feerate $targetFeerate")
        _ = logger.debug(s"created funding txid=${fundingTx.txid} outputIndex=$fundingOutputIndex fee=$fees")
      } yield MakeFundingTxResponse(fundingTx, fundingOutputIndex, fees)
    }

    val partialFundingTx = Transaction(
      version = 2,
      txIn = Seq.empty[TxIn],
      txOut = TxOut(amount, pubkeyScript) :: Nil,
      lockTime = 0)

    for {
      // TODO: we should check that mempoolMinFee is not dangerously high
      feerate <- mempoolMinFee().map(minFee => minFee.perKw.max(targetFeerate))
      // we ask bitcoin core to add inputs to the funding tx, and use the specified change address
      FundTransactionResponse(tx, fee, _) <- fundTransaction(partialFundingTx, feerate, feeBudget_opt = feeBudget_opt)
      lockedUtxos = tx.txIn.map(_.outPoint)
      signedTx <- unlockIfFails(lockedUtxos)(verifyAndSign(tx, fee, feerate))
    } yield signedTx
  }

  def commit(tx: Transaction)(implicit ec: ExecutionContext): Future[Boolean] = publishTransaction(tx).transformWith {
    case Success(_) => Future.successful(true)
    case Failure(e) =>
      logger.warn(s"txid=${tx.txid} error=$e")
      getTransaction(tx.txid).transformWith {
        case Success(_) => Future.successful(true) // tx is in the mempool, we consider that it was published
        case Failure(_) => rollback(tx).transform(_ => Success(false)) // we use transform here because we want to return false in all cases even if rollback fails
      }
  }

  /**
   * Create a child-pays-for-parent transaction to increase the effective feerate of a set of unconfirmed transactions.
   * These unconfirmed transactions must:
   *  - be in our mempool (evicted transactions cannot be used)
   *  - have an output that can be spent by our bitcoin wallet (provided in the outpoints set)
   *  - the total amount of the set of outpoints must be high enough to pay the target feerate
   *
   * @param outpoints     outpoints that should be spent by the CPFP transaction.
   * @param targetFeerate feerate to apply to the package of unconfirmed transactions.
   */
  def cpfp(outpoints: Set[OutPoint], targetFeerate: FeeratePerKw)(implicit ec: ExecutionContext): Future[Transaction] = {
    import KotlinUtils._

    getMempoolPackage(outpoints.map(_.txid)).transformWith {
      case Failure(ex) => Future.failed(new IllegalArgumentException("unable to analyze mempool package: some transactions could not be found in your mempool", ex))
      case Success(mempoolPackage) =>
        getTxOutputs(outpoints).transformWith {
          case Failure(ex) => Future.failed(new IllegalArgumentException("some outpoints are invalid or cannot be resolved", ex))
          case Success(txOutputs) =>
            getChangePublicKeyScript().transformWith {
              case Failure(ex) => Future.failed(new IllegalArgumentException("change address generation failed", ex))
              case Success(changePubkeyScript) =>
                val amountIn = txOutputs.values.map(_.amount).sum
                // We build a transaction spending all the inputs provided to a single change output. Our  inputs are
                // using either p2wpkh or p2tr: p2tr inputs are slightly smaller, but we don't bother doing an exact
                // calculation and always use the weight of p2wpkh inputs for simplicity.
                val p2wpkhInputWeight = 272
                val txWeight = p2wpkhInputWeight * outpoints.size + Transaction(2, Nil, Seq(TxOut(amountIn, changePubkeyScript)), 0).weight()
                val totalWeight = mempoolPackage.values.map(_.weight).sum + txWeight
                val targetFees = Transactions.weight2fee(targetFeerate, totalWeight.toInt)
                val currentFees = mempoolPackage.values.map(_.fees).sum
                val missingFees = targetFees - currentFees
                if (missingFees <= 0.sat) {
                  Future.failed(new IllegalArgumentException("package feerate is already higher than the target feerate"))
                } else if (amountIn <= missingFees + 660.sat) {
                  Future.failed(new IllegalArgumentException("input amount is not sufficient to cover the target feerate"))
                } else {
                  val unsignedTx = Transaction(2, outpoints.toSeq.map(o => TxIn(o, Seq.empty, 0)), Seq(TxOut(amountIn - missingFees, changePubkeyScript)), 0)
                  signPsbt(new Psbt(unsignedTx), unsignedTx.txIn.indices, unsignedTx.txOut.indices).transformWith {
                    case Failure(ex) => Future.failed(new IllegalArgumentException("tx signing failed", ex))
                    case Success(response) => response.finalTx_opt match {
                      case Left(error) => Future.failed(new IllegalArgumentException(s"tx signing failed: ${error.toString}"))
                      case Right(tx) => publishTransaction(tx).map(_ => tx)
                    }
                  }
                }
            }
        }
    }
  }

  /** Recursively fetch unconfirmed parents and return the complete unconfirmed ancestors tree. */
  def getMempoolPackage(leaves: Set[TxId])(implicit ec: ExecutionContext): Future[Map[TxId, MempoolTx]] = getMempoolPackage(leaves, Map.empty)

  private def getMempoolPackage(leaves: Set[TxId], current: Map[TxId, MempoolTx])(implicit ec: ExecutionContext): Future[Map[TxId, MempoolTx]] = {
    Future.sequence(leaves.map(txid => getMempoolTx(txid))).flatMap(txs => {
      val current2 = current.concat(txs.map(tx => tx.txid -> tx))
      val remainingParents = txs.flatMap(_.unconfirmedParents) -- current2.keySet
      if (remainingParents.isEmpty) {
        Future.successful(current2)
      } else {
        getMempoolPackage(remainingParents, current2)
      }
    })
  }

  /** Fetch transaction output details for the given outpoints. */
  private def getTxOutputs(outpoints: Set[OutPoint])(implicit ec: ExecutionContext): Future[Map[OutPoint, TxOut]] = {
    Future.sequence(outpoints.map(_.txid).map(txid => getTransaction(txid))).flatMap(txs => {
      val txOuts = outpoints.flatMap(o => txs.find(tx => tx.txid == o.txid && o.index < tx.txOut.length) match {
        case Some(tx) => Some(o -> tx.txOut(o.index.toInt))
        case None => None
      }).toMap
      outpoints.find(o => !txOuts.contains(o)) match {
        case Some(o) => Future.failed(new IllegalArgumentException(s"invalid outpoint $o"))
        case None => Future.successful(txOuts)
      }
    })
  }

  //------------------------- SIGNING  -------------------------//

  def signPsbt(psbt: Psbt, ourInputs: Seq[Int], ourOutputs: Seq[Int])(implicit ec: ExecutionContext): Future[ProcessPsbtResponse] = {
    onChainKeyManager_opt match {
      case Some(keyManager) =>
        for {
          updated <- utxoUpdatePsbt(psbt)
          filled <- processPsbt(updated, sign = false) // just fill input and output HD paths
          signed <- keyManager.sign(filled.psbt, ourInputs, ourOutputs) match {
            case Success(signedPsbt) => Future.successful(ProcessPsbtResponse(signedPsbt, signedPsbt.extract().isRight))
            case Failure(error) => Future.failed(error)
          }
        } yield signed
      case None =>
        for {
          updated <- utxoUpdatePsbt(psbt)
          signed <- processPsbt(updated, sign = true)
        } yield signed
    }
  }

  //------------------------- PUBLISHING  -------------------------//

  /**
   * Publish a transaction on the bitcoin network.
   *
   * Note that this method is idempotent, meaning that if the tx was already published a long time ago, then this is
   * considered a success even if bitcoin core rejects this new attempt.
   *
   * @return the transaction id (txid)
   */
  def publishTransaction(tx: Transaction)(implicit ec: ExecutionContext): Future[TxId] =
    rpcClient.invoke("sendrawtransaction", tx.toString()).collect {
      case JString(txid) => TxId.fromValidHex(txid)
    }.recoverWith {
      case JsonRPCError(Error(-27, _)) =>
        // "transaction already in block chain (code: -27)"
        Future.successful(tx.txid)
      case e@JsonRPCError(Error(-25, _)) =>
        // "missing inputs (code: -25)": it may be that the tx has already been published and its output spent.
        getRawTransaction(tx.txid).map(_ => tx.txid).recoverWith { case _ => Future.failed(e) }
    }

  /**
   * Publish a 1-parent-1-child transaction package, which allows replacing a conflicting parent transaction that has
   * the same (or a higher) feerate by leveraging CPFP. The child transaction cannot have other unconfirmed parents.
   */
  def publishPackage(parentTx: Transaction, childTx: Transaction)(implicit ec: ExecutionContext): Future[TxId] = {
    rpcClient.invoke("submitpackage", Seq(parentTx, childTx).map(_.toString())).flatMap(json => {
      val JString(msg) = json \ "package_msg"
      if (msg == "success") {
        // All transactions were accepted into or are already in the mempool.
        Future.successful(childTx.txid)
      } else {
        val childError = (json \ "tx-results" \ childTx.wtxid.toHex \ "error").extractOpt[String]
        val parentError = (json \ "tx-results" \ parentTx.wtxid.toHex \ "error").extractOpt[String]
        val error = childError.orElse(parentError).getOrElse("unknown failure")
        Future.failed(new IllegalArgumentException(error))
      }
    })
  }

  override def abandon(txId: TxId)(implicit ec: ExecutionContext): Future[Boolean] = {
    rpcClient.invoke("abandontransaction", txId).map(_ => true).recover(_ => false)
  }

  /** List all outpoints that are currently locked. */
  def listLockedOutpoints()(implicit ec: ExecutionContext): Future[Set[OutPoint]] = {
    rpcClient.invoke("listlockunspent").collect {
      case JArray(locks) => locks.map(item => {
        val JString(txid) = item \ "txid"
        val JInt(vout) = item \ "vout"
        OutPoint(TxId.fromValidHex(txid), vout.toInt)
      }).toSet
    }
  }

  /**
   * @param outPoints outpoints to unlock.
   * @return true if all outpoints were successfully unlocked, false otherwise.
   */
  def unlockOutpoints(outPoints: Seq[OutPoint])(implicit ec: ExecutionContext): Future[Boolean] = {
    // we unlock utxos one by one and not as a list as it would fail at the first utxo that is not actually locked and the rest would not be processed
    val futures = outPoints
      .map(outPoint => OutpointArg(outPoint.txid, outPoint.index))
      .map(utxo => rpcClient
        .invoke("lockunspent", true, List(utxo))
        .mapTo[JBool]
        .transformWith {
          case Success(JBool(result)) => Future.successful(result)
          case Failure(JsonRPCError(error)) if error.message.contains("expected locked output") =>
            Future.successful(true) // we consider that the outpoint was successfully unlocked (since it was not locked to begin with)
          case Failure(_) =>
            Future.successful(false)
        })
    val future = Future.sequence(futures)
    // return true if all outpoints were unlocked false otherwise
    future.map(_.forall(b => b))
  }

  def rollback(tx: Transaction)(implicit ec: ExecutionContext): Future[Boolean] = unlockOutpoints(tx.txIn.map(_.outPoint)) // we unlock all utxos used by the tx

  //------------------------- ADDRESSES  -------------------------//

  def onChainBalance()(implicit ec: ExecutionContext): Future[OnChainBalance] = rpcClient.invoke("getbalances").map(json => {
    val JDecimal(confirmed) = json \ "mine" \ "trusted"
    val JDecimal(unconfirmed) = json \ "mine" \ "untrusted_pending"
    OnChainBalance(toSatoshi(confirmed), toSatoshi(unconfirmed))
  })

  private def extractPublicKey(address: String)(implicit ec: ExecutionContext): Future[PublicKey] = {
    for {
      addressInfo <- rpcClient.invoke("getaddressinfo", address)
      JString(keyPath) = addressInfo \ "hdkeypath"
      JString(rawKey) = addressInfo \ "pubkey"
    } yield {
      val extracted = PublicKey(ByteVector.fromValidHex(rawKey))
      // check that when we manage private keys we can re-compute the public key we got from bitcoin core
      // and that the address and public key match
      onChainKeyManager_opt match {
        case Some(keyManager) =>
          val computed = keyManager.derivePublicKey(DeterministicWallet.KeyPath(keyPath))
          if (computed != (extracted, address)) return Future.failed(new RuntimeException("cannot recompute pubkey generated by bitcoin core"))
        case None => ()
      }
      extracted
    }
  }

  /**
   * @param addressType_opt optional address type: if not specified, then the default address type configured with the
   *                        bitcoin node's `addresstype` option will be used.
   * @return a new receive address
   */
  def getReceiveAddress(addressType_opt: Option[AddressType] = None)(implicit ec: ExecutionContext): Future[String] = for {
    // The first (unused) argument is the address label.
    JString(address) <- rpcClient.invoke("getnewaddress", "", addressType_opt.map(_.bitcoinCoreName))
    verifiedAddress <- verifyAddress(address)
  } yield verifiedAddress

  /** Check that when we manage private keys we can re-compute the address we got from Bitcoin Core. */
  private def verifyAddress(address: String)(implicit ec: ExecutionContext): Future[String] = {
    onChainKeyManager_opt match {
      case Some(keyManager) => rpcClient.invoke("getaddressinfo", address).flatMap { addressInfo =>
        val JString(keyPath) = addressInfo \ "hdkeypath"
        val (_, computed) = keyManager.derivePublicKey(DeterministicWallet.KeyPath(keyPath))
        if (computed != address) {
          Future.failed(new RuntimeException("cannot recompute address generated by bitcoin core"))
        } else {
          Future.successful(address)
        }
      }
      case None => Future.successful(address)
    }
  }

  def getReceivePublicKeyScript(addressType_opt: Option[AddressType] = None)(implicit ec: ExecutionContext): Future[Seq[ScriptElt]] = getReceiveAddress(addressType_opt).flatMap { address =>
    addressToPublicKeyScript(rpcClient.chainHash, address) match {
      case Left(f) => Future.failed(new RuntimeException(s"cannot convert $address to a public key script: ${f.getMessage}"))
      case Right(script) => Future.successful(script)
    }
  }

  def getChangeAddress(addressType_opt: Option[AddressType] = None)(implicit ec: ExecutionContext): Future[String] = for {
    JString(address) <- rpcClient.invoke("getrawchangeaddress", addressType_opt.map(_.bitcoinCoreName))
    verifiedAddress <- verifyAddress(address)
  } yield verifiedAddress

  def getChangePublicKeyScript(addressType_opt: Option[AddressType] = None)(implicit ec: ExecutionContext): Future[Seq[ScriptElt]] = getChangeAddress(addressType_opt).flatMap { address =>
    addressToPublicKeyScript(rpcClient.chainHash, address) match {
      case Left(f) => Future.failed(new RuntimeException(s"cannot convert $address to a public key script: ${f.getMessage}"))
      case Right(script) => Future.successful(script)
    }
  }

  def getP2wpkhPubkey()(implicit ec: ExecutionContext): Future[Crypto.PublicKey] = for {
    address <- getReceiveAddress(Some(P2wpkh))
    pubKey <- extractPublicKey(address)
  } yield pubKey

  /**
   * Ask Bitcoin Core to fund and broadcast a tx that sends funds to a given pubkey script.
   * If the current wallet uses Eclair to sign transaction, then we'll use our on-chain key manager to sign the transaction,
   * with the following assumptions:
   * - all inputs belong to us
   * - all outputs except for the one that sends to `pubkeyScript` belong to us
   *
   * @param pubkeyScript public key script to sent funds to
   * @param amount       amount to send
   * @param feeratePerKw fee rate
   * @return the txid of the sending tx.
   */
  def sendToPubkeyScript(pubkeyScript: ByteVector, amount: Satoshi, feeratePerKw: FeeratePerKw)(implicit ec: ExecutionContext): Future[TxId] = {
    import KotlinUtils._

    val theirOutput = TxOut(amount, pubkeyScript)
    val tx = Transaction(version = 2, txIn = Nil, txOut = theirOutput :: Nil, lockTime = 0)
    for {
      fundedTx <- fundTransaction(tx, feeratePerKw, replaceable = true)
      lockedOutputs = fundedTx.tx.txIn.map(_.outPoint)
      theirOutputPos = fundedTx.tx.txOut.indexOf(theirOutput)
      signedPsbt <- unlockIfFails(lockedOutputs)(signPsbt(new Psbt(fundedTx.tx), fundedTx.tx.txIn.indices, fundedTx.tx.txOut.indices.filterNot(_ == theirOutputPos)))
      _ = require(signedPsbt.finalTx_opt.isRight, s"transaction was not fully signed (${signedPsbt.finalTx_opt.left.toOption.get}): bitcoin core may be malicious")
      signedTx = signedPsbt.finalTx_opt.toOption.get
      actualFees = kmp2scala(signedPsbt.psbt.computeFees())
      actualFeerate = Transactions.fee2rate(actualFees, signedTx.weight())
      maxFeerate = feeratePerKw * 1.5
      _ = require(actualFeerate < maxFeerate, s"actual feerate $actualFeerate is more than 50% above requested feerate $feeratePerKw")
      txid <-  unlockIfFails(lockedOutputs)(publishTransaction(signedTx))
    } yield txid
  }

  def sendToPubkeyScript(pubkeyScript: Seq[ScriptElt], amount: Satoshi, feeratePerKw: FeeratePerKw)(implicit ec: ExecutionContext): Future[TxId] = sendToPubkeyScript(Script.write(pubkeyScript), amount, feeratePerKw)

  /** Calls Bitcoin Core's sendtoaddress RPC call directly. Will fail if wallet is using an external signer. */
  def sendToAddress(address: String, amount: Satoshi, confirmationTarget: Long)(implicit ec: ExecutionContext): Future[TxId] = {
    rpcClient.invoke(
      "sendtoaddress",
      address,
      amount.toBtc.toBigDecimal,
      "sent via eclair",
      "",
      false, // subtractfeefromamount
      true, // replaceable
      confirmationTarget).collect {
      case JString(txid) => TxId.fromValidHex(txid)
    }
  }

  //------------------------- MEMPOOL  -------------------------//

  def getMempool()(implicit ec: ExecutionContext): Future[Seq[Transaction]] =
    for {
      txids <- rpcClient.invoke("getrawmempool").map(json => json.extract[List[String]].map(TxId.fromValidHex))
      // NB: if a transaction is evicted before we've called getTransaction, we need to ignore it instead of failing.
      txs <- Future.sequence(txids.map(getTransaction(_).map(Some(_)).recover { case _ => None }))
    } yield txs.flatten

  def getMempoolTx(txid: TxId)(implicit ec: ExecutionContext): Future[MempoolTx] = {
    rpcClient.invoke("getmempoolentry", txid).map(json => {
      val JInt(vsize) = json \ "vsize"
      val JInt(weight) = json \ "weight"
      val JInt(ancestorCount) = json \ "ancestorcount"
      val JInt(descendantCount) = json \ "descendantcount"
      val JDecimal(fees) = json \ "fees" \ "base"
      val JDecimal(ancestorFees) = json \ "fees" \ "ancestor"
      val JDecimal(descendantFees) = json \ "fees" \ "descendant"
      val JBool(replaceable) = json \ "bip125-replaceable"
      val unconfirmedParents = (json \ "depends").extract[List[String]].map(TxId.fromValidHex).toSet
      // NB: bitcoind counts the transaction itself as its own ancestor and descendant, which is confusing: we fix that by decrementing these counters.
      MempoolTx(txid, vsize.toLong, weight.toLong, replaceable, toSatoshi(fees), ancestorCount.toInt - 1, toSatoshi(ancestorFees), descendantCount.toInt - 1, toSatoshi(descendantFees), unconfirmedParents)
    })
  }

  def mempoolMinFee()(implicit ec: ExecutionContext): Future[FeeratePerKB] =
    rpcClient.invoke("getmempoolinfo").map(json => json \ "mempoolminfee" match {
      case JDecimal(feerate) => FeeratePerKB(Btc(feerate).toSatoshi)
      case JInt(feerate) => FeeratePerKB(Btc(feerate.toLong).toSatoshi)
      case other => throw new RuntimeException(s"mempoolminfee failed: $other")
    })

  //------------------------- BLOCKCHAIN  -------------------------//

  def getBlockHeight()(implicit ec: ExecutionContext): Future[BlockHeight] =
    rpcClient.invoke("getblockcount").collect {
      case JInt(count) => BlockHeight(count.toLong)
    }

  def getBlockId(height: Int)(implicit ec: ExecutionContext): Future[BlockId] = {
    rpcClient.invoke("getblockhash", height).collect {
      // Even though the RPC mentions a block_hash, it is returned encoded as a block_id.
      case JString(blockId) => BlockId(ByteVector32.fromValidHex(blockId))
    }
  }

  def getBlock(blockId: BlockId)(implicit ec: ExecutionContext): Future[Block] = {
    rpcClient.invoke("getblock", blockId.toString(), 0)
      .collect { case JString(raw) => Block.read(raw) }
      .flatMap {
        case block if KotlinUtils.kmp2scala(block.blockId) != blockId => Future.failed(new IllegalArgumentException(s"invalid block returned by bitcoind: we requested $blockId, we got ${block.blockId}"))
        case block => Future.successful(block)
      }
  }

  def validate(c: ChannelAnnouncement)(implicit ec: ExecutionContext): Future[ValidateResult] = {
    val TxCoordinates(blockHeight, txIndex, outputIndex) = coordinates(c.shortChannelId)
    for {
      blockId <- getBlockId(blockHeight.toInt)
      txid <- rpcClient.invoke("getblock", blockId).map(json => Try {
        val JArray(txs) = json \ "tx"
        TxId.fromValidHex(txs(txIndex).extract[String])
      }.getOrElse(TxId(ByteVector32.Zeroes)))
      tx <- getRawTransaction(txid)
      unspent <- isTransactionOutputSpendable(txid, outputIndex, includeMempool = true)
      fundingTxStatus <- if (unspent) {
        Future.successful(UtxoStatus.Unspent)
      } else {
        // if this returns true, it means that the spending tx is *not* in the blockchain
        isTransactionOutputSpendable(txid, outputIndex, includeMempool = false).map(res => UtxoStatus.Spent(spendingTxConfirmed = !res))
      }
    } yield ValidateResult(c, Right((Transaction.read(tx), fundingTxStatus)))
  } recover {
    case t: Throwable => ValidateResult(c, Left(t))
  }

  def listUnspent()(implicit ec: ExecutionContext): Future[Seq[Utxo]] = rpcClient.invoke("listunspent", /* minconf */ 0).collect {
    case JArray(values) => values.map(utxo => {
      val JInt(vout) = utxo \ "vout"
      val ancestorcount = utxo \ "ancestorcount" match {
        case JInt(ancestorcount) => Some(ancestorcount)
        case _ => None
      }
      val JInt(confirmations) = utxo \ "confirmations"
      val JBool(safe) = utxo \ "safe"
      val JDecimal(amount) = utxo \ "amount"
      val JString(txid) = utxo \ "txid"
      val label = utxo \ "label" match {
        case JString(label) => Some(label)
        case _ => None
      }
      Utxo(
        txid = TxId.fromValidHex(txid),
        outputIndex = vout.toLong,
        amount = (amount.doubleValue * 1000).millibtc,
        ancestorCount_opt = ancestorcount.map(_.toInt),
        confirmations = confirmations.toLong,
        safe = safe,
        label_opt = label
      )
    })
  }

}

object BitcoinCoreClient {

  /**
   * When funding transactions that contain non-wallet inputs, we need to specify their maximum weight to let bitcoind
   * compute the total weight of the (funded) transaction and set the fee accordingly.
   */
  case class InputWeight(txid: String, vout: Long, weight: Long)

  object InputWeight {
    def apply(outPoint: OutPoint, weight: Long): InputWeight = InputWeight(outPoint.txid.value.toHex, outPoint.index, weight)
  }

  case class FundTransactionOptions(feeRate: BigDecimal, replaceable: Boolean, lockUnspents: Boolean, changePosition: Option[Int], minconf: Option[Int], input_weights: Option[Seq[InputWeight]])

  /**
   * Information about a transaction currently in the mempool.
   *
   * @param txid               transaction id.
   * @param vsize              virtual transaction size as defined in BIP 141.
   * @param weight             transaction weight as defined in BIP 141.
   * @param replaceable        Whether this transaction could be replaced with RBF (BIP125).
   * @param fees               transaction fees.
   * @param ancestorCount      number of unconfirmed parent transactions.
   * @param ancestorFees       transactions fees for the package consisting of this transaction and its unconfirmed parents.
   * @param descendantCount    number of unconfirmed child transactions.
   * @param descendantFees     transactions fees for the package consisting of this transaction and its unconfirmed children (without its unconfirmed parents).
   * @param unconfirmedParents unconfirmed transactions used as inputs for this transaction.
   */
  case class MempoolTx(txid: TxId, vsize: Long, weight: Long, replaceable: Boolean, fees: Satoshi, ancestorCount: Int, ancestorFees: Satoshi, descendantCount: Int, descendantFees: Satoshi, unconfirmedParents: Set[TxId])

  case class WalletTx(address: String, amount: Satoshi, fees: Satoshi, blockId_opt: Option[BlockId], confirmations: Long, txid: TxId, timestamp: Long)

  /** Outpoint used as RPC argument. */
  case class OutpointArg(txid: TxId, vout: Long)

  case class Utxo(txid: TxId, outputIndex: Long, amount: MilliBtc, ancestorCount_opt: Option[Int], confirmations: Long, safe: Boolean, label_opt: Option[String]) {
    val outPoint = OutPoint(txid, outputIndex)
  }

  def toSatoshi(btcAmount: BigDecimal): Satoshi = Satoshi(btcAmount.bigDecimal.scaleByPowerOfTen(8).longValue)

  /**
   * Bitcoin Core descriptor, as used in Bitcoin Core's RPC API.
   *
   * @param desc      Descriptor string representation
   * @param internal  True if this descriptor is used to generate change addresses. False if this descriptor is used to generate receiving addresses; defined only for active descriptors
   * @param timestamp The creation time of the descriptor
   * @param active    Whether this descriptor is currently used to generate new addresses
   */
  case class Descriptor(desc: String, internal: Boolean = false, timestamp: Long, active: Boolean = true)

  /**
   * Descriptors for a specific Bitcoin wallet.
   *
   * @param wallet_name wallet name
   * @param descriptors list of wallet descriptors
   */
  case class Descriptors(wallet_name: String, descriptors: Seq[Descriptor])

  /**
   * Address type, as managed by Bitcoin Core wallets.
   * We only define values for the types that we actually use.
   */
  sealed trait AddressType {
    /** Name of this address type as used by Bitcoin Core in configuration files and RPC calls. */
    def bitcoinCoreName: String
  }

  object AddressType {
    // @formatter:off
    case object P2wpkh extends AddressType { override def bitcoinCoreName: String = "bech32" }
    case object P2tr extends AddressType { override def bitcoinCoreName: String = "bech32m" }
    // @formatter:on
  }

}